查看原文
其他

消息队列利器—MQProxy架构设计

吴迪老师 好未来技术 2023-03-15

背景:原来部门内没有专门的消息队列服务团队,且不同的研发团队使用的语言不同,在使用消息队列服务时,还需要各个研发团队关注底层消息队列系统的选型、协议以及使用方式,经常由于稳定性和容错机制等问题,导致业务服务受到影响。基础服务中台MQ团队基于kafka官方的提供java客户端,自研了分布式消息中间件MQProxy。用户可以通过简易的SDK轻松地接入MQProxy进行生产和消费,使用丰富、快捷、可靠的消息服务。

01

设计目标

MQProxy主要目标是让研发人员能够方便的使用消息队列服务。研发人员可专注业务编码,无需了解以下内容:
  • 底层消息队列系统的选型;
  • 底层消息队列系统的协议;
  • 底层消息队列系统的使用方式;
  • 底层消息队列系统的健康状况与容错机制;

03

已实现功能

为实现上述目标,MQProxy负责完成的基础功能包括:
  • 对常见的开源消息队列系统选型
  • 统一使用研发人员更熟悉的HTTP协议、TCP协议
  • 提供简单的produce/consume/commit API
  • 关注消息队列系统的健康状况,实现必要的容错与可靠性
在上述功能完成的基础上,MQProxy目前还提供了以下高级功能:
  • 延迟队列:用于发送延时类请求,如:订单支付和消息推送等场景;
  • 死信队列:用于保存用户消费失败的消息,以备查询和再次消费;
  • 消息审计与Trace:目前依赖于网关日志和服务打点进行消息查询,跟踪,后续会通过日志采集来完成全链路跟踪;
  • 超时重投与主动重投:根据需求,将用户在规定时间内消费但是没有提交的消息进行重新投递,用户也可以通过操作界面对死信消息进行主动投递;
  • 消息回调:将用户发送到kafka中的数据主动投递给用户。避免用户在无消息时因空轮询耗费资源;
  • 无限消费者协同:区别阿里云的kafka,自建kafka提供了消费者组自动注册功能,用户可以通过此功能临时创建多消费者组来实现多节点的消息广播,无需提前创建消费者组,代理也会定时清理长期不使用的临时消费者组;
  • 机房多活:用户可以通过消费者代理同时消费两个机房同一个topic中的数据;
  • 多语言SDK:提供php,golang,java的SDK,其他语言会在后续的研发中陆续推出。

03

MQProxy逻辑架构

MQProxy首先选用的底层消息队列服务为Apache Kafka,后期也可以支持其他消息队列服务(如RocketMQ、Pulsar等)。服务端主要分为3个重要模块,分别是生产者模块、消费者模块、和一个辅助模块——延时队列模块。下图为mqproxy整体的逻辑架构:
  • 生产者代理和消费者代理通过http协议,经过nginx网关服务于用户
  • 生产者代理借助“延时队列模块”来发送延时消息。
  • 消费者代理借助“位移合并模块”来解决多节点位移提交等问题。
  • “备份kafka”用来在“业务kafka”发生故障时作为临时存储。
  • mapdb为本地数据库,作用是在生产者代理向kafka代理发送前临时存储消息,避免代理发生宕机时丢失消息。
  • mysql和clickhouse分别用来存储集群状态和死信消息。
  • 为方便用户使用,代理提供了php,go等客户端,并使用普罗米修斯和grafana监控自身和业务的健康状态。
下面分别阐述下服务端3个重要模块的关键设计4.1 ProducerProxy设计ProducerProxy主要解决了当KafkaServer发生故障时,消息无法正常投递问题。主要通过本地数据存储功能和异地切换功能,完成了kafka灾备的处理方案,保障消息的不丢失,提升了消息队列服务整体的可用性。下面从kafka灾备方案、ProducerProxy重试和消息保障机制、扩容方案三个方面进行介绍。1.如何实现kafka灾备?生产者代理通过和kafka之间的心跳来判断kafka的健康状态,如果kafka发生故障,代理会把消息发往备份的kafka中。备份kafka代理会不断尝试将消息重新发往业务kafka。
2.如何判断kafka的健康状态?最初代理通过hystrix熔断器将发往“业务kafka”超时的消息重新发送到“备份kafka”,但是这样做有一个弊端,就是当发送线程被熔断过程后,“业务kafka”如果恢复正常,“处于发送缓存区中的消息”由于java客户端的等待机制仍然会被发送到“业务kafka”,这样会导致消息重复的现象。经过慎重选择,代理使用adminclient作为心跳线程来定时获取“业务kafka”和“备份kafka”的元数据,当代理多次获取不到“业务kafka”的元数据时,决策器会断定代理和“业务kafka”连接发生异常,将发送通道切换至“备份kafka”,直至代理和“业务kafka”之间的连接恢复正常。3.如何保障代理宕机或重启时,消息不丢失?重试机制:如果mqproxy没有响应用户发送请求或者发送失败,sdk会尝试投递消息给其他活跃节点。磁盘存储:mqproxy在将消息发送到kafka前,总会把消息存储至本地数据库mapdb做磁盘存储。如果消息发送成功,则会通过回调删除到本地mapdb中的消息,以防止磁盘暴涨。4.生产代理怎样扩容?生产者整体上是无状态的模型,所以可以在kafka承受范围内,无限进行横向扩容,目前单个生产者代理节点QPS在10000~15000之间是稳定的,生产环境上单个节点最大可以承受20000左右QPS,如果吞吐量不满足要求,可以通过增加节点来满足业务需求。下图为生产者代理的架构图:
4.2 ConsumerProxy设计ConsumerProxy基于java高性能分布式框架dubbo,实现了高吞吐强一致性的无状态服务。
ConsumerProxy的逻辑架构图如下:
消息拉取的总体流程:step1:server集群将消息从kafka预拉取消息到本地的内存中,并将自身代理的topic注册到zookeeper上;step2:portal节点获取注册到zookeeper上的server节点信息。当任意portal节点收到用户指定topic消费请求时,portal可以准确的向server集群指定节点发起消费请求;step3:portal节点将从server节点拉取的消息返回给用户;step4:用户通过网关向portal的任意节点提交消息位移;step5:通过zookeeper,portal节点将收到的位移提交请求准确的提交给server节点,server节点收到提交请求后删除内存中的消息;step6:portal节点将收到的位移提交请求同时投递到kafka的其他topic如:topicA上;step7:位移合并模块通过消费topicA,把提交到topicA上的零散位移进行区间合并;step8:位移合并模块将合并结果提交到topicB中,topicB是一个可压缩类型(即“log compaction”类型)的topic;step9:server节点实时的消费topicB,并记录每个消费实例的位移合并结果。以用来分区再均衡时,对位移进行回溯和过滤。下面就关键的技术点进行详细说明:
  • 如何实现重复消费?
消费者提供重复消费的功能,如果在指定时间窗口内消息没有提交,代理可以把消息重新投递给用户。用户根据自身的业务需求,可以自定义重复消费的次数。达到最大消费次数的消息会投递到死信队列,用户可以通过查看和消费死信队列进行二次消费。ps:server节点核心的数据结构由2个列表组成
  1. 预拉取消息列表;
  2. 已拉取但未提交的消息列表,代理使用java的delayQueue来保存未提交消息。用户从预拉取消息列表中拉取消息后,消息并未真正删除,而是投递到java的delayQueue中进行倒计时,当消息在delayQueue中倒计时结束后,如果仍需重新消费,则将消息重新投递到预拉取的消息列表中,供用户重复消费。
  • 如何保证高吞吐量?
为了保障用户的消费速率,代理采用的是预拉取模式,即预先将消息拉取到mqproxy内存中,用户拉取消息时,可以直接从mqproxy的内存进行拉取消息,不需要和kafka进行交互。但是预拉取模式使得mqproxy变成了一个有状态的模型。
  • 如何消除mqproxy对用户的状态?
代理采用了java高性能分布式框架dubbo,dubbo的“提供者”(server节点)负责从kafka预拉取消息,dubbo的“消费者”(portal节点)负责和用户进行交互。用户可以从portal任意节点接入,portal通过zookeeper可以准确的找到注册的server节点进行消费。这样整个消费者代理对用户来说是无状态模型。
  • 如何实现多线程消费?
用户因为使用mqpoxy的缘故,并不直接和kafka进行通信,业务上可以进行任意的扩缩容,不用关注扩容时分区数不够带来的影响和缩容时可能发生丢消息等问题。但是多节点向mqproxy进行消息拉取,提交消息时并不能保证提交的顺序性,提交的顺序性丢失可能导致代理发生故障时,消息丢失或重复消费。
  • 如何管理用户无序的消息提交?
代理会把用户提交的位移进行区间合并。当发生“分区再均衡”或者代理重新拉起时,mqproxy会根据区间合并结果进行位移回溯,过滤掉提交过的消息,将没提交的消息重新投递给用户。位移回溯过程举例如下:
消费者可以是任意节点,向mqproxy任意节点提交位移,但是这会产生一些位移空洞,如:消费者拉取位移1~10,但是由于消费能力的原因或者坏消息的原因或者用户程序重启等原因,提交的消息位移是[1~7],[9~10],此时代理发生重启或者分区再均衡,代理会把位移回溯到8的位置,8号消息重新投递给用户,并过滤掉9~10 的消息,然后从11号位置开始正常消费。
  • 如何实现高效率的位移合并?
  1. 如果消费者代理对提交的位移进行同步合并,则需要通过锁来保证提交的串行,这大大的降低了代理的吞吐量,也背离了代理多线程消费的初衷。
  2. 如果消费者代理对提交的位移进行异步合并,虽然提升了吞吐量,但当代理发生宕机或重启时,会丢失掉部分的提交数据,造成重复消费。
那么如何在位移提交时既能保证代理的吞吐量,又能保证数据的强一致性?代理借助位移合并模块,当用户提交消息时,代理将消息发送到其他的topic中,由位移合并模块来进行区间合并,并将结果存储到公共介质中,供消费者代理使用。
  • 公共存储介质如何选择?
最初,代理想借助redis来存储消息,但是kafka的高吞吐量可能会给redis带来比较重的负担,并且多个节点同时读写redis又要借助分布式锁,从而降低并发效率。比较之后,代理借助了可压缩类型的topic(Log compaction)来代替redis存储位移合并结果。Log compaction类型的topic压缩原理和代理平时使用的map有些相似,相同key值位移靠后的消息会覆盖掉位移靠前的消息,压缩动作由kafka定时执行。压缩过程如下图所示:
  • 位移合并流程如下
用户提交消息时,代理将用户提交的位移单独打到一个topic-A中,并不阻塞业务,位移合并模使用过kafka的stream流计算客户端消费该topic-A,并进行位移合并操作,再将合并的结果发送到Log compaction这种可压缩类型的topic-B中,kafka会定时的清理topic-B中key值相同的消息(这样做的结果保障topicB中的消息很少,并且最后一个消息为最新的合并结果),key值为一个消费实例(key:topic+group+parition),value值用来存储位移提交合并结果(如[1-7],[9-10]),每次代理重新拉起或者发生分区再均衡的情况时,都会读取topicB中的数据进行位移回溯和消息过滤,通过这种手段代理将位移合并模块从与消费者模块耦合的“有锁低并发原型”转变为与消费者模块解耦的“无锁高并发模型”。整个消费者位移合并过程如下图所示:
4.3 任意时延的延时队列代理设计延时队列代理,是基于固定级别的延时队列实现的任意级别的延时队列。mqproxy生产者代理收到用户发往topic-target的延时请求,会根据消息的延时级别将消息打入到其他topic-delay中。延时队列代理将消费到的延时消息在内存中对消息进行倒计时,倒计时结束后,会把消息重新投递到目标topic-target;
固定时延的延时队列的实现方式:
  • 代理预先在kafka中创建一些常用的固定时延级别的topic,如1s,5s,10s,1m,5m,10m,30m… 用户只能发送这些时延种类的消息。
  • 当生产者代理消费到这些消息时,如:5s延时级别,则会将消息投递到delay-5的topic中。
  • mqproxy将从delay-5中消费到的消息拉取到内存中,在java的delayQueue中进行倒计时。(delayQueue是一种可以按照过期剩余时间排列的优先级队列)
  • 当消息在java在的delayQueue中过期,mqproxy会将这些消息投递给真实的topic,并且向固定时延的topic-5提交位移。
  • 因为在同一个的topic中,所有消息的延时级别是一样的,到期时间会自然而然的在每个parition中按照offset从小到大顺序排列,所以mqproxy向topic-5提交位移也是有序的。
  • 因为提交位移的有序性,所以当mqproxy发生宕机后,不会发生丢消息的现象。
但是同时缺点比较明显:精度较差,系统“钙化”,可扩展性不强;那么基于固定时延的方案,如何来实现任意级别的延时队列,具体实现如下:
  • 代理基于指定级别延时队列来实现任意级别延时队列;
  • 代理预先在kafka中创建好一些topic,如:1s,2s,3s,…,9s,10s,20s,30s,…,90s,…,100s,200s,300s,…,900s,1000s,2000s,3000s,…,9000s,…,10000s,20000s,…,90000s。
  • 当有消息投递过来后,代理会计算消息的延时级别,把他投递到一个指定级别的topic中。
  • 如果在这个级别的消息过期,代理再将它投递到下一个级别的topic中。
举例如下:step1:假设生产者代理在当日01:00:00时收到119s延时级别消息,代理会在这条消息的header上标记消息的真实过期时间(01:01:59),并将这条消息看做成100s延时级别的消息(看作是01:01:40后过期),并把消息发送到事先创建好的100s延时级别的topic中;step2:延时队列代理在(01:00:00消费到该消息),发现消息在01:01:40后过期,则将消息在内存中进行100s倒计时,01:01:40倒计时结束,此时查看header中真实的过期时间(01:01:59),发现过期时间还剩余19s,代理将这条消息看做成10s延时级别的消息(即看做01:01:50后过期),发送到事先创建好的10s延时级别的topic中,并向100s延时级别的topic提交位移;step3:延时队列代理(01:01:40消费到该消息)消费10s延时级别的topic时,消费到该消息,发现消息在01:01:50后过期,会再次放入内存中进行10s倒计时,倒计时结束后(时间在01:01:50),查看header中真实的过期时间(01:01:59),发现过期时间还剩余9s,则将这条消息看做成9s延时级别的消息,发送到事先创建好的9s延时级别的topic中,并向10s延时级别的topic提交位移;step4:延时队列代理(01:01:50消费到该消息)消费9s延时级别的topic时,消费到该消息,会再次放入内存中进行9s倒计时,倒计时结束后(时间在01:01:59),查看header中真实的过期时间(01:01:59),发现消息已经到期,则会把消息投递到真实的topic中,并向9s延时级别的topic提交位移。具体投递流程图如下:

04

未来规划:

随着mqproxy接入的用户越来越多,其系统瓶颈也逐渐显现出来,总结如下:
  • mqproxy各个集群实例分布不均匀:目前我们一个mqproxy集群单独服务一个kafka集群,接入的十几个kafka集群中,有的集群消费实例很多(有100多个),而有的实例消费很少,(不到10个),这种情况导致消费实例很少的mqproxy集群资源浪费,但是消费实例很多的mqproxy集群资源紧张。
  • 高级消费者:目前消费者代理采用高级消费者API进行消费,因为高级消费者在消费节点发生变化时,会产生分区再均衡,虽然我们前面一系列的操作对分区再均衡做了处理,但是当单个节点发生故障时,所有节点都会感知到,这降低了我们对系统整体稳定性的预期。
  • 不支持顺序性和事务等高级功能:由于目前代理只能支持http协议传输,所以并不能支持消息的顺序性和事务等高级功能。
  • 消费倾斜:当一个消费者代理多个分区,且QPS较高时,有几率会发生消费倾斜,虽然我们可以通过增加消费节点或者消费线程来解决这种情况,但是未能从根本消除隐患。
为了解决上述问题,我们也对mqproxy架构进行了重新设计和规划,目前部分功能已经在陆续研发中…
  • 消费者资源池模式:我们不再为每个kafka的集群来单独搭建mqproxy,而是由一个大的mqproxy集群来服务所有kafka集群,即所有mqproxy的消费实例都放入这个mqproxy资源池进行管理,当有新的消费实例加入时,我们会从资源池中找到一个负载较低的资源进行资源分配,这样做的好处是,我们没有必要为一个负载较低但是很重要的业务单独搭建一套集群。
  • parition维度的管理:每个消费者实例管理粒度从对topic进行消费降级到对单个partition维度的消费,更细粒度的管理可以使系统更稳定,资源分配更加均匀,并且能从根本上解决消费倾斜带来的风险。
  • 使用低级消费者API:高级消费者API因为支持了分区再均衡,所以当其中一个消费节点发生变更时,所有消费节点都有所感知,从而降低了系统的稳定性。低级消费者API配合分布式资源管理框架helix可以自主调控节点发生变更时的影响范围,从而降低了分区再均衡发生时的不确定性。
  • 支持长连接:为了支持消息的顺序性和事务等高级功能,http连接显然不适合当下的所有场景,支持可靠的长连接服务是完成这些功能的先决条件。
  • 多语言API支持:这是我们目前有所欠缺的,也欢迎有能力伙伴能够加入我们共同开发。



扫描下方二维码添加「好未来技术」微信官方账号
进入好未来技术官方交流群与作者实时互动~
(若扫码无效,可通过微信号TAL-111111直接添加)
- 也许你还想看 -
Vue系列之常见内存泄漏定位与解决
学而思高并发活动保障方案
未来实验A/B测的统计学原理

我知道你“在看”哟~


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存